Repository: incubator-beam Updated Branches: refs/heads/master 861562239 -> 442435ed0
Remove InProcessBundle InProcessBundle is an implementation detail of InProcessBundleFactory, which it should be contained within. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3987a05b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3987a05b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3987a05b Branch: refs/heads/master Commit: 3987a05b83de46823efde2742d585cb5b7b98cc3 Parents: 8615622 Author: Thomas Groh <tg...@google.com> Authored: Mon Apr 18 12:55:26 2016 -0700 Committer: bchambers <bchamb...@google.com> Committed: Tue Apr 19 08:49:39 2016 -0700 ---------------------------------------------------------------------- .../sdk/runners/inprocess/InProcessBundle.java | 124 ---------------- .../EncodabilityEnforcementFactoryTest.java | 11 +- .../ImmutabilityEnforcementFactoryTest.java | 8 +- .../inprocess/InMemoryWatermarkManagerTest.java | 2 +- .../runners/inprocess/InProcessBundleTest.java | 145 ------------------- .../InProcessEvaluationContextTest.java | 26 +--- .../inprocess/TransformExecutorTest.java | 6 +- 7 files changed, 23 insertions(+), 299 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987a05b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundle.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundle.java deleted file mode 100644 index d3da9e1..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessBundle.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static com.google.common.base.Preconditions.checkState; - -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; - -import com.google.common.base.MoreObjects; -import com.google.common.collect.ImmutableList; - -import org.joda.time.Instant; - -import javax.annotation.Nullable; - -/** - * A {@link UncommittedBundle} that buffers elements in memory. - */ -public final class InProcessBundle<T> implements UncommittedBundle<T> { - private final PCollection<T> pcollection; - private final boolean keyed; - private final Object key; - private boolean committed = false; - private ImmutableList.Builder<WindowedValue<T>> elements; - - /** - * Create a new {@link InProcessBundle} for the specified {@link PCollection} without a key. - */ - public static <T> InProcessBundle<T> unkeyed(PCollection<T> pcollection) { - return new InProcessBundle<T>(pcollection, false, null); - } - - /** - * Create a new {@link InProcessBundle} for the specified {@link PCollection} with the specified - * key. - * - * See {@link CommittedBundle#getKey()} and {@link CommittedBundle#isKeyed()} for more - * information. - */ - public static <T> InProcessBundle<T> keyed(PCollection<T> pcollection, Object key) { - return new InProcessBundle<T>(pcollection, true, key); - } - - private InProcessBundle(PCollection<T> pcollection, boolean keyed, Object key) { - this.pcollection = pcollection; - this.keyed = keyed; - this.key = key; - this.elements = ImmutableList.builder(); - } - - @Override - public PCollection<T> getPCollection() { - return pcollection; - } - - @Override - public InProcessBundle<T> add(WindowedValue<T> element) { - checkState(!committed, "Can't add element %s to committed bundle %s", element, this); - elements.add(element); - return this; - } - - @Override - public CommittedBundle<T> commit(final Instant synchronizedCompletionTime) { - checkState(!committed, "Can't commit already committed bundle %s", this); - committed = true; - final Iterable<WindowedValue<T>> committedElements = elements.build(); - return new CommittedBundle<T>() { - @Override - @Nullable - public Object getKey() { - return key; - } - - @Override - public boolean isKeyed() { - return keyed; - } - - @Override - public Iterable<WindowedValue<T>> getElements() { - return committedElements; - } - - @Override - public PCollection<T> getPCollection() { - return pcollection; - } - - @Override - public Instant getSynchronizedProcessingOutputWatermark() { - return synchronizedCompletionTime; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .omitNullValues() - .add("pcollection", pcollection) - .add("key", key) - .add("elements", committedElements) - .toString(); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987a05b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java index 7720589..85c4322 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java @@ -50,6 +50,7 @@ import java.util.Collections; public class EncodabilityEnforcementFactoryTest { @Rule public ExpectedException thrown = ExpectedException.none(); private EncodabilityEnforcementFactory factory = EncodabilityEnforcementFactory.create(); + private BundleFactory bundleFactory = InProcessBundleFactory.create(); @Test public void encodeFailsThrows() { @@ -61,7 +62,7 @@ public class EncodabilityEnforcementFactoryTest { WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record()); CommittedBundle<Record> input = - InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now()); + bundleFactory.createRootBundle(unencodable).add(record).commit(Instant.now()); ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer); thrown.expect(UserCodeException.class); @@ -80,7 +81,7 @@ public class EncodabilityEnforcementFactoryTest { WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record()); CommittedBundle<Record> input = - InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now()); + bundleFactory.createRootBundle(unencodable).add(record).commit(Instant.now()); ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer); thrown.expect(UserCodeException.class); @@ -107,7 +108,7 @@ public class EncodabilityEnforcementFactoryTest { }); CommittedBundle<Record> input = - InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now()); + bundleFactory.createRootBundle(unencodable).add(record).commit(Instant.now()); ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer); thrown.expect(UserCodeException.class); @@ -131,7 +132,7 @@ public class EncodabilityEnforcementFactoryTest { WindowedValue<Record> record = WindowedValue.<Record>valueInGlobalWindow(new Record()); CommittedBundle<Record> input = - InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now()); + bundleFactory.createRootBundle(unencodable).add(record).commit(Instant.now()); ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer); enforcement.beforeElement(record); @@ -152,7 +153,7 @@ public class EncodabilityEnforcementFactoryTest { WindowedValue<Integer> value = WindowedValue.valueInGlobalWindow(1); CommittedBundle<Integer> input = - InProcessBundle.unkeyed(unencodable).add(value).commit(Instant.now()); + bundleFactory.createRootBundle(unencodable).add(value).commit(Instant.now()); ModelEnforcement<Integer> enforcement = factory.forBundle(input, consumer); enforcement.beforeElement(value); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987a05b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java index 4520504..16633ed 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java @@ -46,12 +46,14 @@ import java.util.Collections; public class ImmutabilityEnforcementFactoryTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); private transient ImmutabilityEnforcementFactory factory; + private transient BundleFactory bundleFactory; private transient PCollection<byte[]> pcollection; private transient AppliedPTransform<?, ?, ?> consumer; @Before public void setup() { factory = new ImmutabilityEnforcementFactory(); + bundleFactory = InProcessBundleFactory.create(); TestPipeline p = TestPipeline.create(); pcollection = p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes())) @@ -71,7 +73,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable { public void unchangedSucceeds() { WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes()); CommittedBundle<byte[]> elements = - InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now()); + bundleFactory.createRootBundle(pcollection).add(element).commit(Instant.now()); ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer); enforcement.beforeElement(element); @@ -86,7 +88,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable { public void mutatedDuringProcessElementThrows() { WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes()); CommittedBundle<byte[]> elements = - InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now()); + bundleFactory.createRootBundle(pcollection).add(element).commit(Instant.now()); ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer); enforcement.beforeElement(element); @@ -107,7 +109,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable { WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes()); CommittedBundle<byte[]> elements = - InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now()); + bundleFactory.createRootBundle(pcollection).add(element).commit(Instant.now()); ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer); enforcement.beforeElement(element); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987a05b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java index 077c0e7..736076c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java @@ -798,7 +798,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { Instant upstreamHold = new Instant(2048L); CommittedBundle<Integer> filteredBundle = - bundleFactory.createKeyedBundle(null, "key", filtered).commit(upstreamHold); + bundleFactory.createKeyedBundle(created, "key", filtered).commit(upstreamHold); manager.updateWatermarks( created, filtered.getProducingTransformInternal(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987a05b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleTest.java deleted file mode 100644 index 103ace5..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessBundleTest.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; - -import com.google.common.collect.ImmutableList; - -import org.hamcrest.Matcher; -import org.hamcrest.Matchers; -import org.joda.time.Instant; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; - -/** - * Tests for {@link InProcessBundle}. - */ -@RunWith(JUnit4.class) -public class InProcessBundleTest { - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Test - public void unkeyedShouldCreateWithNullKey() { - PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1)); - - InProcessBundle<Integer> inFlightBundle = InProcessBundle.unkeyed(pcollection); - - CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now()); - - assertThat(bundle.isKeyed(), is(false)); - assertThat(bundle.getKey(), nullValue()); - } - - private void keyedCreateBundle(Object key) { - PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1)); - - InProcessBundle<Integer> inFlightBundle = InProcessBundle.keyed(pcollection, key); - - CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now()); - assertThat(bundle.isKeyed(), is(true)); - assertThat(bundle.getKey(), equalTo(key)); - } - - @Test - public void keyedWithNullKeyShouldCreateKeyedBundle() { - keyedCreateBundle(null); - } - - @Test - public void keyedWithKeyShouldCreateKeyedBundle() { - keyedCreateBundle(new Object()); - } - - private <T> void afterCommitGetElementsShouldHaveAddedElements(Iterable<WindowedValue<T>> elems) { - PCollection<T> pcollection = TestPipeline.create().apply(Create.<T>of()); - - InProcessBundle<T> bundle = InProcessBundle.unkeyed(pcollection); - Collection<Matcher<? super WindowedValue<T>>> expectations = new ArrayList<>(); - for (WindowedValue<T> elem : elems) { - bundle.add(elem); - expectations.add(equalTo(elem)); - } - Matcher<Iterable<? extends WindowedValue<T>>> containsMatcher = - Matchers.<WindowedValue<T>>containsInAnyOrder(expectations); - assertThat(bundle.commit(Instant.now()).getElements(), containsMatcher); - } - - @Test - public void getElementsBeforeAddShouldReturnEmptyIterable() { - afterCommitGetElementsShouldHaveAddedElements(Collections.<WindowedValue<Integer>>emptyList()); - } - - @Test - public void getElementsAfterAddShouldReturnAddedElements() { - WindowedValue<Integer> firstValue = WindowedValue.valueInGlobalWindow(1); - WindowedValue<Integer> secondValue = - WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1000L)); - - afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(firstValue, secondValue)); - } - - @Test - public void addAfterCommitShouldThrowException() { - PCollection<Integer> pcollection = TestPipeline.create().apply(Create.<Integer>of()); - - InProcessBundle<Integer> bundle = InProcessBundle.unkeyed(pcollection); - bundle.add(WindowedValue.valueInGlobalWindow(1)); - CommittedBundle<Integer> firstCommit = bundle.commit(Instant.now()); - assertThat(firstCommit.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(1))); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("3"); - thrown.expectMessage("committed"); - - bundle.add(WindowedValue.valueInGlobalWindow(3)); - } - - @Test - public void commitAfterCommitShouldThrowException() { - PCollection<Integer> pcollection = TestPipeline.create().apply(Create.<Integer>of()); - - InProcessBundle<Integer> bundle = InProcessBundle.unkeyed(pcollection); - bundle.add(WindowedValue.valueInGlobalWindow(1)); - CommittedBundle<Integer> firstCommit = bundle.commit(Instant.now()); - assertThat(firstCommit.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(1))); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("committed"); - - bundle.commit(Instant.now()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987a05b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java index 6736562..50b83fd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -93,6 +93,8 @@ public class InProcessEvaluationContextTest { private Collection<AppliedPTransform<?, ?, ?>> rootTransforms; private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers; + private BundleFactory bundleFactory; + @Before public void setup() { InProcessPipelineRunner runner = @@ -110,6 +112,8 @@ public class InProcessEvaluationContextTest { rootTransforms = cVis.getRootTransforms(); valueToConsumers = cVis.getValueToConsumers(); + bundleFactory = InProcessBundleFactory.create(); + context = InProcessEvaluationContext.create( runner.getPipelineOptions(), @@ -393,37 +397,23 @@ public class InProcessEvaluationContextTest { } @Test - public void createBundleUnkeyedResultUnkeyed() { - CommittedBundle<KV<String, Integer>> newBundle = - context - .createBundle(InProcessBundle.unkeyed(created).commit(Instant.now()), downstream) - .commit(Instant.now()); - assertThat(newBundle.isKeyed(), is(false)); - } - - @Test public void createBundleKeyedResultPropagatesKey() { CommittedBundle<KV<String, Integer>> newBundle = context - .createBundle(InProcessBundle.keyed(created, "foo").commit(Instant.now()), downstream) + .createBundle( + bundleFactory.createKeyedBundle(null, "foo", created).commit(Instant.now()), + downstream) .commit(Instant.now()); - assertThat(newBundle.isKeyed(), is(true)); assertThat(newBundle.getKey(), Matchers.<Object>equalTo("foo")); } @Test - public void createRootBundleUnkeyed() { - assertThat(context.createRootBundle(created).commit(Instant.now()).isKeyed(), is(false)); - } - - @Test public void createKeyedBundleKeyed() { CommittedBundle<KV<String, Integer>> keyedBundle = context .createKeyedBundle( - InProcessBundle.unkeyed(created).commit(Instant.now()), "foo", downstream) + bundleFactory.createRootBundle(created).commit(Instant.now()), "foo", downstream) .commit(Instant.now()); - assertThat(keyedBundle.isKeyed(), is(true)); assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo")); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3987a05b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java index d29ffac..d3d70e0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java @@ -332,7 +332,7 @@ public class TransformExecutorTest { WindowedValue<String> fooElem = WindowedValue.valueInGlobalWindow("foo"); WindowedValue<String> barElem = WindowedValue.valueInGlobalWindow("bar"); CommittedBundle<String> inputBundle = - InProcessBundle.unkeyed(created).add(fooElem).add(barElem).commit(Instant.now()); + bundleFactory.createRootBundle(created).add(fooElem).add(barElem).commit(Instant.now()); when( registry.forApplication( downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) @@ -393,7 +393,7 @@ public class TransformExecutorTest { WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes()); CommittedBundle<byte[]> inputBundle = - InProcessBundle.unkeyed(pcBytes).add(fooBytes).commit(Instant.now()); + bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now()); when( registry.forApplication( pcBytes.getProducingTransformInternal(), inputBundle, evaluationContext)) @@ -452,7 +452,7 @@ public class TransformExecutorTest { WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes()); CommittedBundle<byte[]> inputBundle = - InProcessBundle.unkeyed(pcBytes).add(fooBytes).commit(Instant.now()); + bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now()); when( registry.forApplication( pcBytes.getProducingTransformInternal(), inputBundle, evaluationContext))