I don't mean to derail the tricky environment questions, but I'm not seeing why this is bundled as an IO rather than a plain DoFn (which can be applied to a PCollection of one or more commands, yielding their outputs). Especially for the case of a Read, which in this case is not splittable (initially or dynamically) and always produces a single element--feels much more like a Map to me.
On Tue, Dec 6, 2016 at 3:26 PM, Eugene Kirpichov <kirpic...@google.com.invalid> wrote: > Ben - the issues of "things aren't hung, there is a shell command running", > aren't they general to all DoFn's? i.e. I don't see why the runner would > need to know that a shell command is running, but not that, say, a heavy > monolithic computation is running. What's the benefit to the runner in > knowing that the DoFn contains a shell command? > > By saying "making sure that all shell commands finish", I suppose you're > referring to the possibility of leaks if the user initiates a shell command > and forgets to wait for it? I think that should be solvable again without > Beam intervention, by making a utility class for running shell commands > which implements AutoCloseable, and document that you have to use it that > way. > > Ken - I think the question here is: are we ok with a situation where the > runner doesn't check or care whether the shell command can run, and the > user accepts this risk and studies what commands will be available on the > worker environment provided by the runner they use in production, before > productionizing a pipeline with those commands. > > Upon some thought I think it's ok. Of course, this carries an obligation > for runners to document their worker environment and its changes across > versions. Though for many runners such documentation may be trivial: > "whatever your YARN cluster has, the runner doesn't change it in any way" > and it may be good enough for users. And for other runners, like Dataflow, > such documentation may also be trivial: "no guarantees whatsoever, only > what you stage in --filesToStage is available". > > I can also see Beam develop to a point where we'd want all runners to be > able to run your DoFn in a user-specified Docker container, and manage > those intelligently - but I think that's quite a while away and it doesn't > have to block work on a utility for executing shell commands. Though it'd > be nice if the utility was forward-compatible with that future world. > > On Tue, Dec 6, 2016 at 2:16 AM Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > >> Hi Eugene, >> >> thanks for the extended questions. >> >> I think we have two levels of expectations here: >> - end-user responsibility >> - worker/runner responsibility >> >> 1/ From a end-user perspective, the end-user has to know that using a >> system command (via ExecIO) and more generally speaking anything which >> relay on worker resources (for instance a local filesystem directory >> available only on a worker) can fail if the expected resource is not >> present on all workers. So, basically, all workers should have the same >> topology. It's what I'm assuming for the PR. >> For example, I have my Spark cluster, using the same Mesos/Docker setup, >> then the user knows that all nodes in the cluster will have the same >> setup and so resources (it could be provided by DevOps for instance). >> On the other hand, running on Dataflow is different because I don't >> "control" the nodes (bootstrapping or resources), but in that case, the >> user knows it (he knows the runner he's using). >> >> 2/ As you said, we can expect that runner can deal with some >> requirements (expressed depending of the pipeline and the runner), and >> the runner can know the workers which provide capabilities matching >> those requirements. >> Then, the end user is not more responsible: the runner will try to >> define if the pipeline can be executed, and where a DoFn has to be run >> (on which worker). >> >> For me, it's two different levels where 2 is smarter but 1 can also make >> sense. >> >> WDYT ? >> >> Regards >> JB >> >> On 12/05/2016 08:51 PM, Eugene Kirpichov wrote: >> > Hi JB, >> > >> > Thanks for bringing this to the mailing list. I also think that this is >> > useful in general (and that use cases for Beam are more than just classic >> > bigdata), and that there are interesting questions here at different >> levels >> > about how to do it right. >> > >> > I suggest to start with the highest-level question [and discuss the >> > particular API only after agreeing on this, possibly in a separate >> thread]: >> > how to deal with the fact that Beam gives no guarantees about the >> > environment on workers, e.g. which commands are available, which shell or >> > even OS is being used, etc. Particularly: >> > >> > - Obviously different runners will have a different environment, e.g. >> > Dataflow workers are not going to have Hadoop commands available because >> > they are not running on a Hadoop cluster. So, pipelines and transforms >> > developed using this connector will be necessarily non-portable between >> > different runners. Maybe this is ok? But we need to give users a clear >> > expectation about this. How do we phrase this expectation and where do we >> > put it in the docs? >> > >> > - I'm concerned that this puts additional compatibility requirements on >> > runners - it becomes necessary for a runner to document the environment >> of >> > its workers (OS, shell, privileges, guaranteed-installed packages, access >> > to other things on the host machine e.g. whether or not the worker runs >> in >> > its own container, etc.) and to keep it stable - otherwise transforms and >> > pipelines with this connector will be non-portable between runner >> versions >> > either. >> > >> > Another way to deal with this is to give up and say "the environment on >> the >> > workers is outside the scope of Beam; consult your runner's documentation >> > or use your best judgment as to what the environment will be, and use >> this >> > at your own risk". >> > >> > What do others think? >> > >> > On Mon, Dec 5, 2016 at 5:09 AM Jean-Baptiste Onofré <j...@nanthrax.net> >> wrote: >> > >> > Hi beamers, >> > >> > Today, Beam is mainly focused on data processing. >> > Since the beginning of the project, we are discussing about extending >> > the use cases coverage via DSLs and extensions (like for machine >> > learning), or via IO. >> > >> > Especially for the IO, we can see Beam use for data integration and data >> > ingestion. >> > >> > In this area, I'm proposing a first IO: ExecIO: >> > >> > https://issues.apache.org/jira/browse/BEAM-1059 >> > https://github.com/apache/incubator-beam/pull/1451 >> > >> > Actually, this IO is mainly an ExecFn that executes system commands >> > (again, keep in mind we are discussing about data integration/ingestion >> > and not data processing). >> > >> > For convenience, this ExecFn is wrapped in Read and Write (as a regular >> IO). >> > >> > Clearly, this IO/Fn depends of the worker where it runs. But it's under >> > the user responsibility. >> > >> > During the review, Eugene and I discussed about: >> > - is it an IO or just a fn ? >> > - is it OK to have worker specific IO ? >> > >> > IMHO, an IO makes lot of sense to me and it's very convenient for end >> > users. They can do something like: >> > >> > PCollection<String> output = >> > pipeline.apply(ExecIO.read().withCommand("/path/to/myscript.sh")); >> > >> > The pipeline will execute myscript and the output pipeline will contain >> > command execution std out/err. >> > >> > On the other hand, they can do: >> > >> > pcollection.apply(ExecIO.write()); >> > >> > where PCollection contains the commands to execute. >> > >> > Generally speaking, end users can call ExecFn wherever they want in the >> > pipeline steps: >> > >> > PCollection<String> output = pipeline.apply(ParDo.of(new >> ExecIO.ExecFn())); >> > >> > The input collection contains the commands to execute, and the output >> > collection contains the commands execution result std out/err. >> > >> > Generally speaking, I'm preparing several IOs more on the data >> > integration/ingestion area than on "pure" classic big data processing. I >> > think it would give a new "dimension" to Beam. >> > >> > Thoughts ? >> > >> > Regards >> > JB >> > -- >> > Jean-Baptiste Onofré >> > jbono...@apache.org >> > http://blog.nanthrax.net >> > Talend - http://www.talend.com >> > >> >> -- >> Jean-Baptiste Onofré >> jbono...@apache.org >> http://blog.nanthrax.net >> Talend - http://www.talend.com >>