Hi everyone,

It's my understanding that Beam names it's transforms in a
hierarchical fashion. So for example if I have:

// main
p.apply("foo_name", new Foo())

// Foo.java
public class Foo extends PTransform<...>{

@Override
public PCollection<...> expand(PCollection<...> input){

return input.apply(Pardo.of(...));

}

}

Then the ParDo name is going to be "foo_name/ParDo(Anonymous)". If I add a
name, like so, input.apply("bar_name", ParDo.of(... )), the name of the
ParDo will be "foo_name/bar_name".

Now it's my understanding that this name is passed as the uid to Flink.
Flink uses the uid to save and restore the state of the transforms. Which
means that if I decide that I want to inline Foo, that is, replace
p.apply("foo_name", new Foo()) with p.apply("bar_name", ParDo.of(...), I
would break any saved state of my pipeline for this ParDo, assuming it's a
stateful ParDo. Unless I explicitly name my ParDo as such:
p.apply("foo_name/bar_name", ParDo.of(...)

I haven't tested this yet, I will be doing that over the week-end, but
looking at the Beam Flink Runner code and Beam/Flink documentation, I think
my description above is correct.

Is there a way to have these types of refactorings not break the saved
state? Is there a best-practices guide somewhere that explains these
scenarios?

Thank you,
Cristian

Reply via email to