Kenn,

Thanks for the pointer WRT the source watermark. It wasn't set to
MAX_TIMESTAMP upon end of input and hence the global window was never
emitted. Got the assertions almost working now, just need to propagate the
exceptions to the unit test driver and then we should be ready for a new
revision of the PR.

Thomas

On Tue, Sep 27, 2016 at 10:17 PM, Thomas Weise <thomas.we...@gmail.com>
wrote:

> I have GroupByKey working, here is a unit test:
>
> https://github.com/tweise/incubator-beam/blob/BEAM-261.
> sideinputs/runners/apex/src/test/java/org/apache/beam/
> runners/apex/translators/GroupByKeyTranslatorTest.java#L65L110
>
> For the earlier PAssert example, PAssert.GroupGlobally will assign global
> window and remove any triggering. Then the following groupBy won't emit an
> aggregate. I'm trying to figure out what I'm missing.
>
> >     PCollection<Integer> pcollection = pipeline.apply(Create.of(...));
> >     PAssert.that(pcollection).empty();
>
> Thomas
>
>
> On Tue, Sep 27, 2016 at 5:02 PM, Kenneth Knowles <k...@google.com.invalid>
> wrote:
>
>> Hi Thomas,
>>
>> Great news about the side inputs! The only thing you should need for most
>> PAsserts to work is GroupByKey. A few PAsserts require side inputs, so if
>> you got those working then you should have everything you need for all the
>> PAsserts.
>>
>> The lack of triggering in PAsserts like the one you mention is because
>> they
>> rely on the behavior that the aggregation for a window is emitted when the
>> window expires and is garbage collected, so all the values for the window
>> are in one aggregation, thus it is the final value and can be tested. This
>> happens as part of the GroupAlsoByWindowViaWindowSetDoFn (the logic
>> itself
>> is part of ReduceFnRunner), so if you have state and timers working, you
>> should see output.
>>
>> If this doesn't seem to be happening, maybe you can give some more
>> details?
>>
>> Kenn
>>
>> On Tue, Sep 27, 2016 at 7:09 PM Thomas Weise <t...@apache.org> wrote:
>>
>> > Hi Kenn,
>> >
>> > Thanks, this was very helpful. I got the side input translation working
>> > now, although I want to go back and see if the View.asXYZ expansions
>> can be
>> > simplified.
>> >
>> > But before that I need to tackle PAssert, which is the next blocker for
>> me
>> > to get many of the integration tests working. I see that the PAsserts
>> > generate TimestampedValueInGlobalWindow with no triggers and so grouping
>> > will accumulate state but not emit anything
>> > (PAssert$0/GroupGlobally/GatherAllOutputs/GroupByKey).
>> >
>> >     PCollection<Integer> pcollection = pipeline.apply(Create.of(...));
>> >     PAssert.that(pcollection).empty();
>> >
>> > Is there a good place to look for a basic understanding of PAssert and
>> what
>> > the runner needs to support?
>> >
>> > Thanks,
>> > Thomas
>> >
>> >
>> >
>> > On Thu, Sep 15, 2016 at 11:51 AM, Kenneth Knowles
>> <k...@google.com.invalid>
>> > wrote:
>> >
>> > > Hi Thomas,
>> > >
>> > > The side inputs 1-pager is a forward-looking document for the design
>> of
>> > > side inputs in Beam once the portability layers are completed. The
>> > current
>> > > SDK and implementations do not quite respect the same abstraction
>> > > boundaries, even though they are similar.
>> > >
>> > > Here are some specifics about that 1-pager that I hope will help you
>> > right
>> > > now:
>> > >
>> > >  - The purple cylinder that says "Runner materializes" corresponds to
>> the
>> > > CreatePCollectionView transform. Eventually this should not appear in
>> the
>> > > SDK or the pipeline representation, but today that is where you put
>> your
>> > > logic to write to some runner-specific storage medium, etc.
>> > >  - This "Runner materializes" / "CreatePCollectionView" is consistent
>> > with
>> > > streaming, of course. When new data arrives, the runner makes the new
>> > side
>> > > input value available. Most of the View.asXYZ transforms have a
>> > GroupByKey
>> > > within them, so the triggering on the side input PCollection will
>> > regulate
>> > > this.
>> > >  - The red "RPC" boundary in the diagram will be part of the
>> > cross-language
>> > > Fn API. For today, that layer is not present, and it is the Java class
>> > > ViewFn on each PCollectionView<ViewT>. It takes an
>> > > Iterable<WindowedValue<ElemT>> and produces a ViewT.
>> > >  - If we were to use the existing ViewFns without modification, the
>> > > primitive "access_pattern" would be "iterable", not "multimap". Thus,
>> the
>> > > access pattern does not support fetching an individual KV record
>> > > efficiently when the side input is large (when it is small, the map
>> can
>> > be
>> > > built in memory and cached). As we move forwards, this should change.
>> > >
>> > > And here are answers beyond the side input 1-pager:
>> > >
>> > >  - The problem of expiry of the side input data is [BEAM-260]. The
>> > solution
>> > > is pretty easy, but I have been waiting to send out my proposal to
>> solve
>> > it
>> > > with a [WindowMappingFn] since we have so many proposals already in
>> > flight.
>> > > I am sharing it now, here, since you brought it up again.
>> > >  - A good, though very large, reference is the recent addition of side
>> > > inputs to the Flink runner in [PR #737] by Aljoscha. In particular, it
>> > adds
>> > > [SideInputHandler] as a runner-independent way to build side inputs on
>> > top
>> > > of StateInternals. I suspect you would benefit from using this.
>> > >
>> > > I hope this helps!
>> > >
>> > > Kenn
>> > >
>> > > [BEAM-260] https://issues.apache.org/jira/browse/BEAM-260
>> > > [WindowMappingFn] https://s.apache.org/beam-windowmappingfn-1-pager
>> > > <https://s.apache.org/beam-windowmappingfn-1-pager>[PR #737]
>> > > https://github.com/apache/incubator-beam/pull/737
>> > > [SideInputHandler] https://github.com/apache/incu
>> bator-beam/blob/master/
>> > > runners/core-java/src/main/java/org/apache/beam/runners/
>> > > core/SideInputHandler.java
>> > >
>> > > On Thu, Sep 15, 2016 at 10:12 AM, Thomas Weise <t...@apache.org>
>> wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > I'm working on the Apex runner (
>> > > > https://github.com/apache/incubator-beam/pull/540) and based on the
>> > > > integration test results my next target is support for
>> PCollectionView.
>> > > >
>> > > > I looked at the side inputs doc (
>> > > > https://s.apache.org/beam-side-inputs-1-pager) and see that a
>> > suggested
>> > > > implementation approach is RPC.
>> > > >
>> > > > Apex is a streaming engine where individual records flow through the
>> > > > pipeline and operators process data once it becomes available. Hence
>> > I'm
>> > > > also looking at side inputs as a stream vs. a call to fetch a
>> specific
>> > > > record. But that would also require a ParDo operator to hold on to
>> the
>> > > side
>> > > > input state until it is no longer needed (based on expiry of the
>> > window)?
>> > > >
>> > > > I would appreciate your thoughts on this. Is there a good streaming
>> > based
>> > > > implementation to look at for reference? Also, any suggestions to
>> break
>> > > the
>> > > > support for side inputs into multiple tasks that can be taken up
>> > > > independently?
>> > > >
>> > > > Thanks!
>> > > > Thomas
>> > > >
>> > >
>> >
>>
>
>

Reply via email to