Thanks so much, Kenn! Exactly what I was looking for 😊 On 28.07.22, 19:32, "Kenneth Knowles" <k...@apache.org> wrote:
Yes, this is expected. The goal of replaceAll is to replace a transform with a different subgraph that implements precisely the same semantics. And since the rest of the graph depends on the PCollections, the new expanded transform is wired Yes, this is expected. The goal of replaceAll is to replace a transform with a different subgraph that implements precisely the same semantics. And since the rest of the graph depends on the PCollections, the new expanded transform is wired directly to the old outputs. First point: Certainly the streaming mode should also support bounded inputs, so the test as-is should still pass. But still, it does seem potentially useful to increase test coverage by re-using the same test with an unbounded-but-finite input like you are doing. I am not sure this will work for all tests. Some transforms may require bounded input. But to force it: You may be able to just run the visitor and set the boundedness property on the PCollections. Then of course your SparkRunner translation layer needs to know what to do. It may be simpler to just have a flag on your translator that translates Create.Values into something that looks unbounded at the Spark layer. Kenn On Thu, Jul 28, 2022 at 2:01 AM Moritz Mack <mm...@talend.com<mailto:mm...@talend.com>> wrote: Hi all, Wondering if somebody could help and shed some lights on the behavior of Pipeline.replaceAll, particularly the outputs to expect after the replacement. I’m currently looking into supporting VR tests for SparkRunner in streaming mode [1]. Unfortunately, I didn’t succeed replacing (wrapping) the unbounded Create.Values used as test input into an unbounded source in a way that the node’s output would be UNBOUNDED. After the replacement the output is actually still the original one. Is this expected? What would be the recommended way to achieve this? Below some code to explain further [2]. Also, related, [3] is a tiny PR to fix a broken assertion in PipelineTest.testReplaceAll(). Thanks, Moritz pipeline.apply("boundedToUnbounded", Create.of(0L)); pipeline.replaceAll( ImmutableList.of( PTransformOverride.of( application -> application.getTransform() instanceof Create.Values, // Replacement is GenerateSequence.from(Iterables.getOnlyElement(elements))) new ValuesToUnboundedSequenceOverride()))); pipeline.traverseTopologically( new PipelineVisitor.Defaults() { @Override public CompositeBehavior enterCompositeTransform(Node node) { if (node.getFullName().equals("boundedToUnbounded")) { assertThat(node.getTransform(), Matchers.instanceOf(GenerateSequence.class)); // FIXME Node still contains the original BOUNDED output. But why? PCollection<?> output = Iterables.getOnlyElement(node.getOutputs().values()); assertThat(output.isBounded(), Matchers.equalTo(PCollection.IsBounded.UNBOUNDED)); } return CompositeBehavior.ENTER_TRANSFORM; } }); [1] https://github.com/apache/beam/pull/22473<https://urldefense.com/v3/__https:/github.com/apache/beam/pull/22473__;!!CiXD_PY!Qy-3aDbAyCO8Jyb0FQnp85IA8zQNZNhZ1NNfXy6fCy89M7G9pNKj68rR9WMPkBL3Wr950YeDcg$> [2] https://github.com/apache/beam/compare/master...mosche:beam:BoundedToUnboundedReplaceAll<https://urldefense.com/v3/__https:/github.com/apache/beam/compare/master...mosche:beam:BoundedToUnboundedReplaceAll__;!!CiXD_PY!Qy-3aDbAyCO8Jyb0FQnp85IA8zQNZNhZ1NNfXy6fCy89M7G9pNKj68rR9WMPkBL3Wr9DHsOLNw$> [3] https://github.com/apache/beam/pull/22485<https://urldefense.com/v3/__https:/github.com/apache/beam/pull/22485__;!!CiXD_PY!Qy-3aDbAyCO8Jyb0FQnp85IA8zQNZNhZ1NNfXy6fCy89M7G9pNKj68rR9WMPkBL3Wr9aCBaexQ$> As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/> As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/>