Hi,
sorry for resurrecting such an old thread but are there already thoughts on
how the quiescence handling will work for runner-independent tests?

I was thinking about how to make the RunnableOnService tests work when
executed in "true-streaming" mode, i.e. when the job would normally never
finish? Right now, the tests work because the sources finish at some point
and we verify that the PAssert DoFn sees the correct results. With
streaming runners this "finished" bit is hard to do and I feel that it is
related to the quiescence idea expression in the document.

Cheers,
Aljoscha

On Thu, 31 Mar 2016 at 19:32 Ben Chambers <bchamb...@google.com.invalid>
wrote:

> On Mon, Mar 28, 2016 at 4:29 PM Robert Bradshaw
> <rober...@google.com.invalid>
> wrote:
>
> > On Fri, Mar 25, 2016 at 4:28 PM, Ben Chambers
> > <bchamb...@google.com.invalid> wrote:
> > > My only concern is that in the example, you first need to declare all
> the
> > > inputs, then the pipeline to be tested, then all the outputs. This can
> > lead
> > > to tests that are hard to follow, since what you're really testing is
> an
> > > interleaving more like "When these inputs arrive, I get this output.
> Then
> > > when this happens, I get that output. Etc.".
> >
> > +1 to pursuing this direction.
> >
> > > What if instea of returning a PTransform<PBegin, PCollection<Long>> we
> > had
> > > a "TestSource".
> >
> > I think TestSource is a PTransform<PBegin, PCollection<Long>>.
> >
>
> Maybe? If we want it to easily support multiple inputs, maybe you do
> `testSource.getInput(tag)` to get the `PTransform<PBegin, PCollection<T>>`
> associated with a given tag? But yes, I intended the `TestSource` to be
> usable within the pipeline to actually produce the data.
>
> >
> > > so we did something like:
> > >
> > > TestPipeline p = TestPipeline.create();
> > > TestSource source = p.testSource();
> > >
> > > // Set up pipeline reading from source.
> > > PCollection<Long> sum = ...;
> >
> > I'm really curious what the "..." looks like. How are we using the
> source?
> >
>
> Either `p.apply(source)` or `p.apply(source.forTag(tag))`. Not sure about
> naming, of course.
>
> >
> > > BeamAssert sumAssert = BeamAssert.sum();
> >
> > Did you mean BeamAssert.that(sum)?
> >
>
> Almost certainly. Or maybe `BeamAssert.on(sum)`. But something like that.
>
> > // Test for the Speculative Pane
> > > source.addElements(...);
> > > source.advanceWatermark(...);
> > > sumAssert.thatWindowPane(...);
> > >
> > > // Test for the On Time Pane
> > > source.addElements(...)
> > > source.advanceWatermark(...);
> > > sumAssert.thatWindowPane(...);
> > >
> > > etc.
> >
> > Is there a p.run() at the end?
> >
>
> Almost certainly.
>
>
> > > We could also allow TestSource to work with multiple input pipelines
> like
> > > this:
> > >
> > > TestSource<Integer> intSource = p.testSource(new
> > TypeDescriptor<Integer>());
> > > TestSource<Long> longSource = p.testSource(new TypeDescriptor<Long>());
> > > ...
> > > intSource.addElements(...);
> > > longSource.addElements(...);
> > > etc.
> >
> > Would we get at total ordering on the addition of elements/advancement
> > of watermarks across sources by the temporal ordering of these
> > operations in the users program (e.g. by incrementing some global
> > counter)?
> >
>
> Ideally? I was focusing on the interleaving of inputs/assertions, but we
> can talk more about this.
>
>
> > > On Fri, Mar 25, 2016 at 4:08 PM Thomas Groh <tg...@google.com.invalid>
> > > wrote:
> > >
> > >> Hey everyone;
> > >>
> > >> I'd still be happy to get feedback. I'm going to start working on this
> > >> early next week
> > >>
> > >> Thanks,
> > >>
> > >> Thomas
> > >>
> > >> On Mon, Mar 21, 2016 at 5:38 PM, Thomas Groh <tg...@google.com>
> wrote:
> > >>
> > >> > Hey everyone,
> > >> >
> > >> > I've been working on a proposal to expand the capabilities of our
> > testing
> > >> > API, mostly around writing deterministic tests for pipelines that
> have
> > >> > interesting triggering behavior, especially speculative and late
> > >> triggers.
> > >> >
> > >> > I've shared a doc here
> > >> > <
> > >>
> >
> https://docs.google.com/document/d/1fZUUbG2LxBtqCVabQshldXIhkMcXepsbv2vuuny8Ix4/edit?usp=sharing
> > >
> > >> containing
> > >> > the proposal and some examples, with world comment access + explicit
> > >> > committer edit access. I'd welcome any feedback you all have.
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Thomas
> > >> >
> > >>
> >
>

Reply via email to