Runners will be able to determine whether or not they can execute a
pipeline based on these requirements. The details are rather the domain of
the Fn API design.

On Mon, Dec 5, 2016 at 1:38 PM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

> @Kenn - Would you suggest that all runners need to support running code in
> a user-specified container?
> @Ben - Hmm, the features you're suggesting don't seem like they require
> deep integration into Beam itself, but can be accomplished by separate
> utility functions (or perhaps regular language-specific facilities like
> java's ProcessBuilder).
>
> On Mon, Dec 5, 2016 at 1:21 PM Ben Chambers <bchamb...@apache.org> wrote:
>
> One option would be to use the reflective DoFn approach to this. Imagine
> something like:
>
> public class MyExternalFn extends DoFn {
>   @ProcessElement
>   // Existence of ShellExecutor indicates the code shells out.
>   public void processElement(ProcessContext c, ShellExecutor shell) {
>     ...
>     Future<ShellResult> result = shell.executeAsync("...");
>     ...
>     c.output(result.get());
>   }
> }
>
> The API for the shell can include non-future methods, but this allows the
> runners to know what commands interact with the shell, but also to report
> things like (1) shell process fails (2) shell process hangs forever, better
> indicate that upwards and (3) it allows the runner to manage parallelism
> interacting with the shell.
>
> Requirements for the executor can be specified with an annotation on the
> parameter or via an annotation within the DoFn.
>
> On Mon, Dec 5, 2016 at 1:15 PM Kenneth Knowles <k...@google.com.invalid>
> wrote:
>
> > I would like the runner-independent, language-independent graph to have a
> > way to specify requirements on the environment that a DoFn runs in. This
> > would provide a natural way to talk about installed libraries,
> containers,
> > external services that are accessed, etc, and I think the requirement of
> a
> > particular OS with tools installed fits right in. At the crudest level,
> > this could be limited to a container URL.
> >
> > Then the Java SDK needs a way to express these requirements. They will
> > generally probably be properties of a DoFn instance rather than a DoFn
> > class, since they may vary with instantiation parameters.
> >
> > On Mon, Dec 5, 2016 at 11:51 AM, Eugene Kirpichov <
> > kirpic...@google.com.invalid> wrote:
> >
> > > Hi JB,
> > >
> > > Thanks for bringing this to the mailing list. I also think that this is
> > > useful in general (and that use cases for Beam are more than just
> classic
> > > bigdata), and that there are interesting questions here at different
> > levels
> > > about how to do it right.
> > >
> > > I suggest to start with the highest-level question [and discuss the
> > > particular API only after agreeing on this, possibly in a separate
> > thread]:
> > > how to deal with the fact that Beam gives no guarantees about the
> > > environment on workers, e.g. which commands are available, which shell
> or
> > > even OS is being used, etc. Particularly:
> > >
> > > - Obviously different runners will have a different environment, e.g.
> > > Dataflow workers are not going to have Hadoop commands available
> because
> > > they are not running on a Hadoop cluster. So, pipelines and transforms
> > > developed using this connector will be necessarily non-portable between
> > > different runners. Maybe this is ok? But we need to give users a clear
> > > expectation about this. How do we phrase this expectation and where do
> we
> > > put it in the docs?
> > >
> > > - I'm concerned that this puts additional compatibility requirements on
> > > runners - it becomes necessary for a runner to document the environment
> > of
> > > its workers (OS, shell, privileges, guaranteed-installed packages,
> access
> > > to other things on the host machine e.g. whether or not the worker runs
> > in
> > > its own container, etc.) and to keep it stable - otherwise transforms
> and
> > > pipelines with this connector will be non-portable between runner
> > versions
> > > either.
> > >
> > > Another way to deal with this is to give up and say "the environment on
> > the
> > > workers is outside the scope of Beam; consult your runner's
> documentation
> > > or use your best judgment as to what the environment will be, and use
> > this
> > > at your own risk".
> > >
> > > What do others think?
> > >
> > > On Mon, Dec 5, 2016 at 5:09 AM Jean-Baptiste Onofré <j...@nanthrax.net>
> > > wrote:
> > >
> > > Hi beamers,
> > >
> > > Today, Beam is mainly focused on data processing.
> > > Since the beginning of the project, we are discussing about extending
> > > the use cases coverage via DSLs and extensions (like for machine
> > > learning), or via IO.
> > >
> > > Especially for the IO, we can see Beam use for data integration and
> data
> > > ingestion.
> > >
> > > In this area, I'm proposing a first IO: ExecIO:
> > >
> > > https://issues.apache.org/jira/browse/BEAM-1059
> > > https://github.com/apache/incubator-beam/pull/1451
> > >
> > > Actually, this IO is mainly an ExecFn that executes system commands
> > > (again, keep in mind we are discussing about data integration/ingestion
> > > and not data processing).
> > >
> > > For convenience, this ExecFn is wrapped in Read and Write (as a regular
> > > IO).
> > >
> > > Clearly, this IO/Fn depends of the worker where it runs. But it's under
> > > the user responsibility.
> > >
> > > During the review, Eugene and I discussed about:
> > > - is it an IO or just a fn ?
> > > - is it OK to have worker specific IO ?
> > >
> > > IMHO, an IO makes lot of sense to me and it's very convenient for end
> > > users. They can do something like:
> > >
> > > PCollection<String> output =
> > > pipeline.apply(ExecIO.read().withCommand("/path/to/myscript.sh"));
> > >
> > > The pipeline will execute myscript and the output pipeline will contain
> > > command execution std out/err.
> > >
> > > On the other hand, they can do:
> > >
> > > pcollection.apply(ExecIO.write());
> > >
> > > where PCollection contains the commands to execute.
> > >
> > > Generally speaking, end users can call ExecFn wherever they want in the
> > > pipeline steps:
> > >
> > > PCollection<String> output = pipeline.apply(ParDo.of(new
> > ExecIO.ExecFn()));
> > >
> > > The input collection contains the commands to execute, and the output
> > > collection contains the commands execution result std out/err.
> > >
> > > Generally speaking, I'm preparing several IOs more on the data
> > > integration/ingestion area than on "pure" classic big data processing.
> I
> > > think it would give a new "dimension" to Beam.
> > >
> > > Thoughts ?
> > >
> > > Regards
> > > JB
> > > --
> > > Jean-Baptiste Onofré
> > > jbono...@apache.org
> > > http://blog.nanthrax.net
> > > Talend - http://www.talend.com
> > >
> >
>

Reply via email to