Thanks so much, Kenn!
Exactly what I was looking for 😊

On 28.07.22, 19:32, "Kenneth Knowles" <k...@apache.org> wrote:

Yes, this is expected.⁠​ The goal of replaceAll is to replace a transform with 
a different subgraph that implements precisely the same semantics.⁠​ And since 
the rest of the graph depends on the PCollections, the new expanded transform 
is wired

Yes, this is expected. The goal of replaceAll is to replace a transform with a 
different subgraph that implements precisely the same semantics. And since the 
rest of the graph depends on the PCollections, the new expanded transform is 
wired directly to the old outputs.

First point: Certainly the streaming mode should also support bounded inputs, 
so the test as-is should still pass.

But still, it does seem potentially useful to increase test coverage by 
re-using the same test with an unbounded-but-finite input like you are doing. I 
am not sure this will work for all tests. Some transforms may require bounded 
input.

But to force it: You may be able to just run the visitor and set the 
boundedness property on the PCollections. Then of course your SparkRunner 
translation layer needs to know what to do. It may be simpler to just have a 
flag on your translator that translates Create.Values into something that looks 
unbounded at the Spark layer.

Kenn

On Thu, Jul 28, 2022 at 2:01 AM Moritz Mack 
<mm...@talend.com<mailto:mm...@talend.com>> wrote:
Hi all,

Wondering if somebody could help and shed some lights on the behavior of 
Pipeline.replaceAll, particularly the outputs to expect after the replacement.
I’m currently looking into supporting VR tests for SparkRunner in streaming 
mode [1]. Unfortunately, I didn’t succeed replacing (wrapping) the unbounded 
Create.Values used as test input into an unbounded source in a way that the 
node’s output would be UNBOUNDED. After the replacement the output is actually 
still the original one.
Is this expected? What would be the recommended way to achieve this?

Below some code to explain further [2].
Also, related, [3] is a tiny PR to fix a broken assertion in 
PipelineTest.testReplaceAll().

Thanks,
Moritz

    pipeline.apply("boundedToUnbounded", Create.of(0L));

    pipeline.replaceAll(
        ImmutableList.of(
            PTransformOverride.of(
                application -> application.getTransform() instanceof 
Create.Values,
                // Replacement is 
GenerateSequence.from(Iterables.getOnlyElement(elements)))
                new ValuesToUnboundedSequenceOverride())));

    pipeline.traverseTopologically(
        new PipelineVisitor.Defaults() {
          @Override
          public CompositeBehavior enterCompositeTransform(Node node) {
            if (node.getFullName().equals("boundedToUnbounded")) {
              assertThat(node.getTransform(), 
Matchers.instanceOf(GenerateSequence.class));

              // FIXME Node still contains the original BOUNDED output. But why?
              PCollection<?> output = 
Iterables.getOnlyElement(node.getOutputs().values());
              assertThat(output.isBounded(), 
Matchers.equalTo(PCollection.IsBounded.UNBOUNDED));
            }
            return CompositeBehavior.ENTER_TRANSFORM;
          }
        });


[1] 
https://github.com/apache/beam/pull/22473<https://urldefense.com/v3/__https:/github.com/apache/beam/pull/22473__;!!CiXD_PY!Qy-3aDbAyCO8Jyb0FQnp85IA8zQNZNhZ1NNfXy6fCy89M7G9pNKj68rR9WMPkBL3Wr950YeDcg$>
[2] 
https://github.com/apache/beam/compare/master...mosche:beam:BoundedToUnboundedReplaceAll<https://urldefense.com/v3/__https:/github.com/apache/beam/compare/master...mosche:beam:BoundedToUnboundedReplaceAll__;!!CiXD_PY!Qy-3aDbAyCO8Jyb0FQnp85IA8zQNZNhZ1NNfXy6fCy89M7G9pNKj68rR9WMPkBL3Wr9DHsOLNw$>
[3] 
https://github.com/apache/beam/pull/22485<https://urldefense.com/v3/__https:/github.com/apache/beam/pull/22485__;!!CiXD_PY!Qy-3aDbAyCO8Jyb0FQnp85IA8zQNZNhZ1NNfXy6fCy89M7G9pNKj68rR9WMPkBL3Wr9aCBaexQ$>


As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice. <https://www.talend.com/privacy/>


As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice. <https://www.talend.com/privacy/>


Reply via email to