I think this was initially motivated by BEAM-758
<https://issues.apache.org/jira/browse/BEAM-758>. Copying from that issue:

    In the forthcoming runner API, a user will be able to save a pipeline
to JSON and then run it repeatedly.

    Many pieces of code (e.g., BigQueryIO.Read or Write) rely on a single
random value (nonce). These values are typically generated at pipeline
construction time (in PTransform#expand), so that they are deterministic
(don't change across retries of DoFns) and global (are the same across all
workers).

    However, once the runner API lands the existing code would result in
the same nonce being reused across jobs, which breaks BigQueryIO. Other
possible solutions:
       * Generate nonce in Create(1) | ParDo then use this as a side input.
Should work, as along as side inputs are actually checkpointed. But does
not work for BoundedSource, which cannot accept side inputs.
       * If a nonce is only needed for the lifetime of one bundle, can be
generated in startBundle and used in processElement/finishBundle/tearDown.
       * Add some context somewhere that lets user code access unique step
name, and somehow generate a nonce consistently e.g. by hashing. Will
usually work, but this is similarly not available to sources.

I believe your proposal is to add such a nonce to the root PipelineOptions
object -- perhaps, `String getRunNonce()` or something like that. This
would let us have a different nonce for every Pipeline.run() call, but it
would add the requirement to runners that they must populate it.

My 2c: This would be an easy change for runners and unblocks the issue, but
it complicates the demand on runner authors. Longer-term, plumbing a
context into places like BoundedSource and providing the value there is a
better idea.

Dan

On Fri, Jan 20, 2017 at 11:30 AM, Davor Bonaci <[email protected]> wrote:

> Expecting runners to populate, or override, SDK-level pipeline options
> isn't a great thing, particularly in a scenario that would affect
> correctness.
>
> The main thing is discoverability of a subtle API like this -- there's
> little chance somebody writing a new runner would stumble across this and
> do the right thing. It would be much better to make expectations from a
> runner clear, say, via a runner-provided "context" API. I'd stay away from
> a pipeline option with a default value.
>
> The other contentions topic here is the usage of a job-level or
> execution-level identifier. This easily becomes ambiguous in the presence
> of Flink's savepoints, Dataflow's update, fast re-execution, canary vs.
> production pipeline, cross-job optimizations, etc. I think we'd be better
> off with a transform-level nonce than a job-level one.
>
> Finally, the real solution is to enhance the model and make such a
> functionality available to everyone, e.g., roughly "init" + "checkpoint" +
> "side-input to source / splittabledofn / composable io".
>
> --
>
> Practically, to solve the problem at hand quickly, I'd be in favor of a
> context-based approach.
>
> On Thu, Jan 19, 2017 at 10:22 AM, Sam McVeety <[email protected]>
> wrote:
>
> > Hi folks, I'm looking for feedback on whether the following is a
> reasonable
> > approach to handling ValueProviders that are intended to be populated at
> > runtime by a given Runner (e.g. a Dataflow job ID, which is a GUID from
> the
> > service).  Two potential pieces of a solution:
> >
> > 1. Annotate such parameters with @RunnerProvided, which results in an
> > Exception if the user manually tries to set the parameter.
> >
> > 2. Allow for a DefaultValueFactory to be present for the set of Runners
> > that do not override the parameter.
> >
> > Best,
> > Sam
> >
>

Reply via email to