One option would be to use the reflective DoFn approach to this. Imagine something like:
public class MyExternalFn extends DoFn { @ProcessElement // Existence of ShellExecutor indicates the code shells out. public void processElement(ProcessContext c, ShellExecutor shell) { ... Future<ShellResult> result = shell.executeAsync("..."); ... c.output(result.get()); } } The API for the shell can include non-future methods, but this allows the runners to know what commands interact with the shell, but also to report things like (1) shell process fails (2) shell process hangs forever, better indicate that upwards and (3) it allows the runner to manage parallelism interacting with the shell. Requirements for the executor can be specified with an annotation on the parameter or via an annotation within the DoFn. On Mon, Dec 5, 2016 at 1:15 PM Kenneth Knowles <k...@google.com.invalid> wrote: > I would like the runner-independent, language-independent graph to have a > way to specify requirements on the environment that a DoFn runs in. This > would provide a natural way to talk about installed libraries, containers, > external services that are accessed, etc, and I think the requirement of a > particular OS with tools installed fits right in. At the crudest level, > this could be limited to a container URL. > > Then the Java SDK needs a way to express these requirements. They will > generally probably be properties of a DoFn instance rather than a DoFn > class, since they may vary with instantiation parameters. > > On Mon, Dec 5, 2016 at 11:51 AM, Eugene Kirpichov < > kirpic...@google.com.invalid> wrote: > > > Hi JB, > > > > Thanks for bringing this to the mailing list. I also think that this is > > useful in general (and that use cases for Beam are more than just classic > > bigdata), and that there are interesting questions here at different > levels > > about how to do it right. > > > > I suggest to start with the highest-level question [and discuss the > > particular API only after agreeing on this, possibly in a separate > thread]: > > how to deal with the fact that Beam gives no guarantees about the > > environment on workers, e.g. which commands are available, which shell or > > even OS is being used, etc. Particularly: > > > > - Obviously different runners will have a different environment, e.g. > > Dataflow workers are not going to have Hadoop commands available because > > they are not running on a Hadoop cluster. So, pipelines and transforms > > developed using this connector will be necessarily non-portable between > > different runners. Maybe this is ok? But we need to give users a clear > > expectation about this. How do we phrase this expectation and where do we > > put it in the docs? > > > > - I'm concerned that this puts additional compatibility requirements on > > runners - it becomes necessary for a runner to document the environment > of > > its workers (OS, shell, privileges, guaranteed-installed packages, access > > to other things on the host machine e.g. whether or not the worker runs > in > > its own container, etc.) and to keep it stable - otherwise transforms and > > pipelines with this connector will be non-portable between runner > versions > > either. > > > > Another way to deal with this is to give up and say "the environment on > the > > workers is outside the scope of Beam; consult your runner's documentation > > or use your best judgment as to what the environment will be, and use > this > > at your own risk". > > > > What do others think? > > > > On Mon, Dec 5, 2016 at 5:09 AM Jean-Baptiste Onofré <j...@nanthrax.net> > > wrote: > > > > Hi beamers, > > > > Today, Beam is mainly focused on data processing. > > Since the beginning of the project, we are discussing about extending > > the use cases coverage via DSLs and extensions (like for machine > > learning), or via IO. > > > > Especially for the IO, we can see Beam use for data integration and data > > ingestion. > > > > In this area, I'm proposing a first IO: ExecIO: > > > > https://issues.apache.org/jira/browse/BEAM-1059 > > https://github.com/apache/incubator-beam/pull/1451 > > > > Actually, this IO is mainly an ExecFn that executes system commands > > (again, keep in mind we are discussing about data integration/ingestion > > and not data processing). > > > > For convenience, this ExecFn is wrapped in Read and Write (as a regular > > IO). > > > > Clearly, this IO/Fn depends of the worker where it runs. But it's under > > the user responsibility. > > > > During the review, Eugene and I discussed about: > > - is it an IO or just a fn ? > > - is it OK to have worker specific IO ? > > > > IMHO, an IO makes lot of sense to me and it's very convenient for end > > users. They can do something like: > > > > PCollection<String> output = > > pipeline.apply(ExecIO.read().withCommand("/path/to/myscript.sh")); > > > > The pipeline will execute myscript and the output pipeline will contain > > command execution std out/err. > > > > On the other hand, they can do: > > > > pcollection.apply(ExecIO.write()); > > > > where PCollection contains the commands to execute. > > > > Generally speaking, end users can call ExecFn wherever they want in the > > pipeline steps: > > > > PCollection<String> output = pipeline.apply(ParDo.of(new > ExecIO.ExecFn())); > > > > The input collection contains the commands to execute, and the output > > collection contains the commands execution result std out/err. > > > > Generally speaking, I'm preparing several IOs more on the data > > integration/ingestion area than on "pure" classic big data processing. I > > think it would give a new "dimension" to Beam. > > > > Thoughts ? > > > > Regards > > JB > > -- > > Jean-Baptiste Onofré > > jbono...@apache.org > > http://blog.nanthrax.net > > Talend - http://www.talend.com > > >