Yes, this is expected. The goal of replaceAll is to replace a transform
with a different subgraph that implements precisely the same semantics. And
since the rest of the graph depends on the PCollections, the new expanded
transform is wired directly to the old outputs.

First point: Certainly the streaming mode should also support bounded
inputs, so the test as-is should still pass.

But still, it does seem potentially useful to increase test coverage by
re-using the same test with an unbounded-but-finite input like you are
doing. I am not sure this will work for all tests. Some transforms may
require bounded input.

But to force it: You may be able to just run the visitor and set the
boundedness property on the PCollections. Then of course your SparkRunner
translation layer needs to know what to do. It may be simpler to just have
a flag on your translator that translates Create.Values into something that
looks unbounded at the Spark layer.

Kenn

On Thu, Jul 28, 2022 at 2:01 AM Moritz Mack <[email protected]> wrote:

> Hi all,
>
>
>
> Wondering if somebody could help and shed some lights on the behavior of
> Pipeline.replaceAll, particularly the outputs to expect after the
> replacement.
>
> I’m currently looking into supporting VR tests for SparkRunner in
> streaming mode [1]. Unfortunately, I didn’t succeed replacing (wrapping)
> the unbounded Create.Values used as test input into an unbounded source in
> a way that the node’s output would be UNBOUNDED. After the replacement the
> output is actually still the original one.
>
> Is this expected? What would be the recommended way to achieve this?
>
>
>
> Below some code to explain further [2].
>
> Also, related, [3] is a tiny PR to fix a broken assertion in
> PipelineTest.testReplaceAll().
>
>
>
> Thanks,
>
> Moritz
>
>
>
>     pipeline.apply("boundedToUnbounded", Create.of(0L));
>
>
>
>     pipeline.replaceAll(
>
>         ImmutableList.of(
>
>             PTransformOverride.of(
>
>                 application -> application.getTransform() instanceof
> Create.Values,
>
>                 // Replacement is
> GenerateSequence.from(Iterables.getOnlyElement(elements)))
>
>                 new ValuesToUnboundedSequenceOverride())));
>
>
>
>     pipeline.traverseTopologically(
>
>         new PipelineVisitor.Defaults() {
>
>           @Override
>
>           public CompositeBehavior enterCompositeTransform(Node node) {
>
>             if (node.getFullName().equals("boundedToUnbounded")) {
>
>               assertThat(node.getTransform(),
> Matchers.instanceOf(GenerateSequence.class));
>
>
>
>               // FIXME Node still contains the original BOUNDED output.
> But why?
>
>               PCollection<?> output =
> Iterables.getOnlyElement(node.getOutputs().values());
>
>               assertThat(output.isBounded(),
> Matchers.equalTo(PCollection.IsBounded.UNBOUNDED));
>
>             }
>
>             return CompositeBehavior.ENTER_TRANSFORM;
>
>           }
>
>         });
>
>
>
>
>
> [1] https://github.com/apache/beam/pull/22473
>
> [2]
> https://github.com/apache/beam/compare/master...mosche:beam:BoundedToUnboundedReplaceAll
>
>
> [3] https://github.com/apache/beam/pull/22485
>
>
>
> *As a recipient of an email from Talend, your contact personal data will
> be on our systems. Please see our privacy notice.
> <https://www.talend.com/privacy/>*
>
>
>

Reply via email to