Runners will be able to determine whether or not they can execute a pipeline based on these requirements. The details are rather the domain of the Fn API design.
On Mon, Dec 5, 2016 at 1:38 PM, Eugene Kirpichov < kirpic...@google.com.invalid> wrote: > @Kenn - Would you suggest that all runners need to support running code in > a user-specified container? > @Ben - Hmm, the features you're suggesting don't seem like they require > deep integration into Beam itself, but can be accomplished by separate > utility functions (or perhaps regular language-specific facilities like > java's ProcessBuilder). > > On Mon, Dec 5, 2016 at 1:21 PM Ben Chambers <bchamb...@apache.org> wrote: > > One option would be to use the reflective DoFn approach to this. Imagine > something like: > > public class MyExternalFn extends DoFn { > @ProcessElement > // Existence of ShellExecutor indicates the code shells out. > public void processElement(ProcessContext c, ShellExecutor shell) { > ... > Future<ShellResult> result = shell.executeAsync("..."); > ... > c.output(result.get()); > } > } > > The API for the shell can include non-future methods, but this allows the > runners to know what commands interact with the shell, but also to report > things like (1) shell process fails (2) shell process hangs forever, better > indicate that upwards and (3) it allows the runner to manage parallelism > interacting with the shell. > > Requirements for the executor can be specified with an annotation on the > parameter or via an annotation within the DoFn. > > On Mon, Dec 5, 2016 at 1:15 PM Kenneth Knowles <k...@google.com.invalid> > wrote: > > > I would like the runner-independent, language-independent graph to have a > > way to specify requirements on the environment that a DoFn runs in. This > > would provide a natural way to talk about installed libraries, > containers, > > external services that are accessed, etc, and I think the requirement of > a > > particular OS with tools installed fits right in. At the crudest level, > > this could be limited to a container URL. > > > > Then the Java SDK needs a way to express these requirements. They will > > generally probably be properties of a DoFn instance rather than a DoFn > > class, since they may vary with instantiation parameters. > > > > On Mon, Dec 5, 2016 at 11:51 AM, Eugene Kirpichov < > > kirpic...@google.com.invalid> wrote: > > > > > Hi JB, > > > > > > Thanks for bringing this to the mailing list. I also think that this is > > > useful in general (and that use cases for Beam are more than just > classic > > > bigdata), and that there are interesting questions here at different > > levels > > > about how to do it right. > > > > > > I suggest to start with the highest-level question [and discuss the > > > particular API only after agreeing on this, possibly in a separate > > thread]: > > > how to deal with the fact that Beam gives no guarantees about the > > > environment on workers, e.g. which commands are available, which shell > or > > > even OS is being used, etc. Particularly: > > > > > > - Obviously different runners will have a different environment, e.g. > > > Dataflow workers are not going to have Hadoop commands available > because > > > they are not running on a Hadoop cluster. So, pipelines and transforms > > > developed using this connector will be necessarily non-portable between > > > different runners. Maybe this is ok? But we need to give users a clear > > > expectation about this. How do we phrase this expectation and where do > we > > > put it in the docs? > > > > > > - I'm concerned that this puts additional compatibility requirements on > > > runners - it becomes necessary for a runner to document the environment > > of > > > its workers (OS, shell, privileges, guaranteed-installed packages, > access > > > to other things on the host machine e.g. whether or not the worker runs > > in > > > its own container, etc.) and to keep it stable - otherwise transforms > and > > > pipelines with this connector will be non-portable between runner > > versions > > > either. > > > > > > Another way to deal with this is to give up and say "the environment on > > the > > > workers is outside the scope of Beam; consult your runner's > documentation > > > or use your best judgment as to what the environment will be, and use > > this > > > at your own risk". > > > > > > What do others think? > > > > > > On Mon, Dec 5, 2016 at 5:09 AM Jean-Baptiste Onofré <j...@nanthrax.net> > > > wrote: > > > > > > Hi beamers, > > > > > > Today, Beam is mainly focused on data processing. > > > Since the beginning of the project, we are discussing about extending > > > the use cases coverage via DSLs and extensions (like for machine > > > learning), or via IO. > > > > > > Especially for the IO, we can see Beam use for data integration and > data > > > ingestion. > > > > > > In this area, I'm proposing a first IO: ExecIO: > > > > > > https://issues.apache.org/jira/browse/BEAM-1059 > > > https://github.com/apache/incubator-beam/pull/1451 > > > > > > Actually, this IO is mainly an ExecFn that executes system commands > > > (again, keep in mind we are discussing about data integration/ingestion > > > and not data processing). > > > > > > For convenience, this ExecFn is wrapped in Read and Write (as a regular > > > IO). > > > > > > Clearly, this IO/Fn depends of the worker where it runs. But it's under > > > the user responsibility. > > > > > > During the review, Eugene and I discussed about: > > > - is it an IO or just a fn ? > > > - is it OK to have worker specific IO ? > > > > > > IMHO, an IO makes lot of sense to me and it's very convenient for end > > > users. They can do something like: > > > > > > PCollection<String> output = > > > pipeline.apply(ExecIO.read().withCommand("/path/to/myscript.sh")); > > > > > > The pipeline will execute myscript and the output pipeline will contain > > > command execution std out/err. > > > > > > On the other hand, they can do: > > > > > > pcollection.apply(ExecIO.write()); > > > > > > where PCollection contains the commands to execute. > > > > > > Generally speaking, end users can call ExecFn wherever they want in the > > > pipeline steps: > > > > > > PCollection<String> output = pipeline.apply(ParDo.of(new > > ExecIO.ExecFn())); > > > > > > The input collection contains the commands to execute, and the output > > > collection contains the commands execution result std out/err. > > > > > > Generally speaking, I'm preparing several IOs more on the data > > > integration/ingestion area than on "pure" classic big data processing. > I > > > think it would give a new "dimension" to Beam. > > > > > > Thoughts ? > > > > > > Regards > > > JB > > > -- > > > Jean-Baptiste Onofré > > > jbono...@apache.org > > > http://blog.nanthrax.net > > > Talend - http://www.talend.com > > > > > >