Re: Suggestion for Writing Sink Implementation

2016-08-17 Thread Maximilian Michels
Hi Kenneth,

The problem is that the Write transform is not supported in streaming
execution of the Flink Runner because the streaming execution doesn't
currently support side inputs. PR is open to fix that..

Cheers,
Max

On Thu, Jul 28, 2016 at 8:56 PM, Kenneth Knowles  
wrote:
> Hi Sumit,
>
> I see what has happened here, from that snippet you pasted from the Flink
> runner's code [1]. Thanks for looking into it!
>
> The Flink runner today appears to reject Write.Bounded transforms in
> streaming mode if the sink is not an instance of UnboundedFlinkSink. The
> intent of that code, I believe, was to special case UnboundedFlinkSink to
> make it easy to use an existing Flink sink, not to disable all other Write
> transforms. What do you think, Max?
>
> Until we fix this issue, you should use ParDo transforms to do the writing.
> If you can share a little about your sink, we may be able to suggest
> patterns for implementing it. Like Eugene said, the Write.of(Sink)
> transform is just a specialized pattern of ParDo's, not a Beam primitive.
>
> Kenn
>
> [1]
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
>
>
> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
>> Thanks Sumit. Looks like your question is, indeed, specific to the Flink
>> runner, and I'll then defer to somebody familiar with it.
>>
>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit 
>> wrote:
>>
>> > Thanks a lot Eugene.
>> >
>> > >>>My immediate requirement is to run this Sink on FlinkRunner. Which
>> > mandates that my implementation must also implement SinkFunction<>.  In
>> > that >>>case, none of the Sink<> methods get called anyway.
>> >
>> > I am using FlinkRunner. The Sink implementation that i was writing by
>> > extending Sink<> class had to implement Flink Specific SinkFunction for
>> the
>> > correct translation.
>> >
>> > private static class WriteSinkStreamingTranslator implements
>> >
>> FlinkStreamingPipelineTranslator.StreamTransformTranslator
>> > {
>> >
>> >   @Override
>> >   public void translateNode(Write.Bound transform,
>> > FlinkStreamingTranslationContext context) {
>> > String name = transform.getName();
>> > PValue input = context.getInput(transform);
>> >
>> > Sink sink = transform.getSink();
>> > if (!(sink instanceof UnboundedFlinkSink)) {
>> >   throw new UnsupportedOperationException("At the time, only
>> > unbounded Flink sinks are supported.");
>> > }
>> >
>> > DataStream inputDataSet =
>> > context.getInputDataStream(input);
>> >
>> > inputDataSet.flatMap(new FlatMapFunction()
>> {
>> >   @Override
>> >   public void flatMap(WindowedValue value, Collector
>> > out) throws Exception {
>> > out.collect(value.getValue());
>> >   }
>> > }).addSink(((UnboundedFlinkSink)
>> > sink).getFlinkSource()).name(name);
>> >   }
>> > }
>> >
>> >
>> >
>> >
>> > Regards
>> > Sumit Chawla
>> >
>> >
>> > On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
>> > kirpic...@google.com.invalid> wrote:
>> >
>> > > Hi Sumit,
>> > >
>> > > All reusable parts of a pipeline, including connectors to storage
>> > systems,
>> > > should be packaged as PTransform's.
>> > >
>> > > Sink is an advanced API that you can use under the hood to implement
>> the
>> > > transform, if this particular connector benefits from this API - but
>> you
>> > > don't have to, and many connectors indeed don't need it, and are
>> simpler
>> > to
>> > > implement just as wrappers around a couple of ParDo's writing the data.
>> > >
>> > > Even if the connector is implemented using a Sink, packaging the
>> > connector
>> > > as a PTransform is important because it's easier to apply in a pipeline
>> > and
>> > > because it's more future-proof (the author of the connector may later
>> > > change it to use something else rather than Sink under the hood without
>> > > breaking existing users).
>> > >
>> > > Sink is, currently, useful in the following case:
>> > > - You're writing a bounded amount of data (we do not yet have an
>> > unbounded
>> > > Sink analogue)
>> > > - The location you're writing to is known at pipeline construction
>> time,
>> > > and does not depend on the data itself (support for "data-dependent"
>> > sinks
>> > > is on the radar https://issues.apache.org/jira/browse/BEAM-92)
>> > > - The storage system you're writing to has a distinct "initialization"
>> > and
>> > > "finalization" step, allowing the write operation to appear atomic
>> > (either
>> > > all data is written or none). This mostly applies to files (where
>> writing
>> > > is done by first writing to a temporary directory, and then renaming
>> all
>> > > files to their final location), but there can be other cases too.
>> > >
>> > > Here's an example GCP 

Re: OldDoFn - CounterSet replacement

2016-08-17 Thread Ben Chambers
Ah, I see. I wasn't aware this was in the context of a new runner.

So -- the new mechanism makes it harder to do the "wrong thing" (not wire
up aggregators). But, since you're goal is to get a baseline without proper
support for aggregators, you should be able to create a NoopAggregator
(that ignores the values) and a NoopAggregatorFactory (that creates
NoopAggregators). That should establish your baseline, while also making it
really clear that aggregators are not supported.

Also note that as part of looking at
https://issues.apache.org/jira/browse/BEAM-147 and
https://issues.apache.org/jira/browse/BEAM-458 aggregators may be getting
simpler to support, so let us know before you spend a lot of time actually
wiring them up.

On Wed, Aug 17, 2016 at 8:43 AM Thomas Weise  wrote:

> Hi Ben,
>
> Thanks for the reply. Here is the PR:
>
> https://github.com/apache/incubator-beam/pull/540
>
> The doFnRunner instantiation in old style is here:
>
>
> https://github.com/apache/incubator-beam/pull/540/files#diff-86746f538c22ebafd06fca17f0d0aa94R116
>
> I should also note that focus of the PR is to establish the Apex runner
> baseline and proper support for aggregators isn't part of it, it's
> something I was planning to take up in subsequent round.
>
> Thomas
>
>
> On Wed, Aug 17, 2016 at 8:14 AM, Ben Chambers 
> wrote:
>
> > Hi Thomas!
> >
> > On Tue, Aug 16, 2016 at 9:40 PM Thomas Weise 
> > wrote:
> >
> > > I'm trying to rebase a PR and adjust for the DoFn changes.
> > >
> >
> > Can you elaborate on what you're trying to do (or send a link to the PR)?
> >
> >
> > > CounterSet is gone and there is now AggregatorFactory and I'm looking
> to
> > > fix an existing usage of org.apache.beam.sdk.util.
> > DoFnRunners.simpleRunner.
> > >
> >
> > In practice, these should act the same. CounterSet was an implementation
> > detail used to create implementation-specific Counters. The DoFnRunner
> was
> > supposed to get the CounterSet that was wired up correctly. Now, the
> > AggregatorFactory serves the role of creating wired-up Aggregators. As
> > before, the DoFnRunner should be instantiated with an AggregatorFactory
> > wired up to appropriately.
> >
> >
> > > Given the instance of OldDoFn, what is the recommended way to obtain
> the
> > > aggregator factory when creating the fn runner?
> > >
> >
> > This should come from the runner. When the runner wants to instantiate a
> > DoFnRunner to execute a user DoFn, it provides an AggregatorFactory that
> > will wire up aggregators appropriately.
> >
> >
> > > Thanks!
> > >
> > >
> > > java.lang.NullPointerException
> > > at
> > >
> > > org.apache.beam.sdk.util.DoFnRunnerBase$DoFnContext.
> > createAggregatorInternal(DoFnRunnerBase.java:348)
> > > at
> > >
> > > org.apache.beam.sdk.transforms.OldDoFn$Context.setupDelegateAggregator(
> > OldDoFn.java:224)
> > > at
> > >
> > >
> org.apache.beam.sdk.transforms.OldDoFn$Context.setupDelegateAggregators(
> > OldDoFn.java:215)
> > > at
> > >
> > > org.apache.beam.sdk.util.DoFnRunnerBase$DoFnContext.<
> > init>(DoFnRunnerBase.java:214)
> > > at org.apache.beam.sdk.util.DoFnRunnerBase.(
> > DoFnRunnerBase.java:87)
> > > at
> > > org.apache.beam.sdk.util.SimpleDoFnRunner.(
> > SimpleDoFnRunner.java:42)
> > > at org.apache.beam.sdk.util.DoFnRunners.simpleRunner(
> > DoFnRunners.java:60)
> > >
> >
>


Re: OldDoFn - CounterSet replacement

2016-08-17 Thread Thomas Weise
Hi Ben,

Thanks for the reply. Here is the PR:

https://github.com/apache/incubator-beam/pull/540

The doFnRunner instantiation in old style is here:

https://github.com/apache/incubator-beam/pull/540/files#diff-86746f538c22ebafd06fca17f0d0aa94R116

I should also note that focus of the PR is to establish the Apex runner
baseline and proper support for aggregators isn't part of it, it's
something I was planning to take up in subsequent round.

Thomas


On Wed, Aug 17, 2016 at 8:14 AM, Ben Chambers  wrote:

> Hi Thomas!
>
> On Tue, Aug 16, 2016 at 9:40 PM Thomas Weise 
> wrote:
>
> > I'm trying to rebase a PR and adjust for the DoFn changes.
> >
>
> Can you elaborate on what you're trying to do (or send a link to the PR)?
>
>
> > CounterSet is gone and there is now AggregatorFactory and I'm looking to
> > fix an existing usage of org.apache.beam.sdk.util.
> DoFnRunners.simpleRunner.
> >
>
> In practice, these should act the same. CounterSet was an implementation
> detail used to create implementation-specific Counters. The DoFnRunner was
> supposed to get the CounterSet that was wired up correctly. Now, the
> AggregatorFactory serves the role of creating wired-up Aggregators. As
> before, the DoFnRunner should be instantiated with an AggregatorFactory
> wired up to appropriately.
>
>
> > Given the instance of OldDoFn, what is the recommended way to obtain the
> > aggregator factory when creating the fn runner?
> >
>
> This should come from the runner. When the runner wants to instantiate a
> DoFnRunner to execute a user DoFn, it provides an AggregatorFactory that
> will wire up aggregators appropriately.
>
>
> > Thanks!
> >
> >
> > java.lang.NullPointerException
> > at
> >
> > org.apache.beam.sdk.util.DoFnRunnerBase$DoFnContext.
> createAggregatorInternal(DoFnRunnerBase.java:348)
> > at
> >
> > org.apache.beam.sdk.transforms.OldDoFn$Context.setupDelegateAggregator(
> OldDoFn.java:224)
> > at
> >
> > org.apache.beam.sdk.transforms.OldDoFn$Context.setupDelegateAggregators(
> OldDoFn.java:215)
> > at
> >
> > org.apache.beam.sdk.util.DoFnRunnerBase$DoFnContext.<
> init>(DoFnRunnerBase.java:214)
> > at org.apache.beam.sdk.util.DoFnRunnerBase.(
> DoFnRunnerBase.java:87)
> > at
> > org.apache.beam.sdk.util.SimpleDoFnRunner.(
> SimpleDoFnRunner.java:42)
> > at org.apache.beam.sdk.util.DoFnRunners.simpleRunner(
> DoFnRunners.java:60)
> >
>


Re: OldDoFn - CounterSet replacement

2016-08-17 Thread Ben Chambers
Hi Thomas!

On Tue, Aug 16, 2016 at 9:40 PM Thomas Weise  wrote:

> I'm trying to rebase a PR and adjust for the DoFn changes.
>

Can you elaborate on what you're trying to do (or send a link to the PR)?


> CounterSet is gone and there is now AggregatorFactory and I'm looking to
> fix an existing usage of org.apache.beam.sdk.util.DoFnRunners.simpleRunner.
>

In practice, these should act the same. CounterSet was an implementation
detail used to create implementation-specific Counters. The DoFnRunner was
supposed to get the CounterSet that was wired up correctly. Now, the
AggregatorFactory serves the role of creating wired-up Aggregators. As
before, the DoFnRunner should be instantiated with an AggregatorFactory
wired up to appropriately.


> Given the instance of OldDoFn, what is the recommended way to obtain the
> aggregator factory when creating the fn runner?
>

This should come from the runner. When the runner wants to instantiate a
DoFnRunner to execute a user DoFn, it provides an AggregatorFactory that
will wire up aggregators appropriately.


> Thanks!
>
>
> java.lang.NullPointerException
> at
>
> org.apache.beam.sdk.util.DoFnRunnerBase$DoFnContext.createAggregatorInternal(DoFnRunnerBase.java:348)
> at
>
> org.apache.beam.sdk.transforms.OldDoFn$Context.setupDelegateAggregator(OldDoFn.java:224)
> at
>
> org.apache.beam.sdk.transforms.OldDoFn$Context.setupDelegateAggregators(OldDoFn.java:215)
> at
>
> org.apache.beam.sdk.util.DoFnRunnerBase$DoFnContext.(DoFnRunnerBase.java:214)
> at org.apache.beam.sdk.util.DoFnRunnerBase.(DoFnRunnerBase.java:87)
> at
> org.apache.beam.sdk.util.SimpleDoFnRunner.(SimpleDoFnRunner.java:42)
> at org.apache.beam.sdk.util.DoFnRunners.simpleRunner(DoFnRunners.java:60)
>