Hi all,
Wondering if somebody could help and shed some lights on the behavior of
Pipeline.replaceAll, particularly the outputs to expect after the replacement.
I’m currently looking into supporting VR tests for SparkRunner in streaming
mode [1]. Unfortunately, I didn’t succeed replacing (wrapping) the unbounded
Create.Values used as test input into an unbounded source in a way that the
node’s output would be UNBOUNDED. After the replacement the output is actually
still the original one.
Is this expected? What would be the recommended way to achieve this?
Below some code to explain further [2].
Also, related, [3] is a tiny PR to fix a broken assertion in
PipelineTest.testReplaceAll().
Thanks,
Moritz
pipeline.apply("boundedToUnbounded", Create.of(0L));
pipeline.replaceAll(
ImmutableList.of(
PTransformOverride.of(
application -> application.getTransform() instanceof
Create.Values,
// Replacement is
GenerateSequence.from(Iterables.getOnlyElement(elements)))
new ValuesToUnboundedSequenceOverride())));
pipeline.traverseTopologically(
new PipelineVisitor.Defaults() {
@Override
public CompositeBehavior enterCompositeTransform(Node node) {
if (node.getFullName().equals("boundedToUnbounded")) {
assertThat(node.getTransform(),
Matchers.instanceOf(GenerateSequence.class));
// FIXME Node still contains the original BOUNDED output. But why?
PCollection<?> output =
Iterables.getOnlyElement(node.getOutputs().values());
assertThat(output.isBounded(),
Matchers.equalTo(PCollection.IsBounded.UNBOUNDED));
}
return CompositeBehavior.ENTER_TRANSFORM;
}
});
[1] https://github.com/apache/beam/pull/22473
[2]
https://github.com/apache/beam/compare/master...mosche:beam:BoundedToUnboundedReplaceAll
[3] https://github.com/apache/beam/pull/22485
As a recipient of an email from Talend, your contact personal data will be on
our systems. Please see our privacy notice. <https://www.talend.com/privacy/>