Branched off into a separate thread. How about ShellCommands.execute().withCommand("foo")? This is what it is - it executes shell commands :)
Say, if I want to just execute a command for the sake of its side effect, but I'm not interested in its output - it would feel odd to describe that as either "reading" from the command or "writing" to it. Likewise, when I execute commands in bash, I'm not thinking of it as reading or writing to them. Though, there are various modes of interaction with shell commands; some of them could be called "reading" or "writing" I guess - or both! - The command itself can be specified at pipeline construction time, or fully dynamic (elements of a PCollection are themselves commands), or be constructed from a fixed command and a variable set of arguments coming from the PCollection (one-by-one or xargs-style, cramming as many arguments as fit into the command line limit). - We may also be writing elements of the PCollection to standard input of the command - one-by-one, or in arbitrarily sized batches. - We may be reading the command's stdout, its stderr, and its error code. I think these options call for a more flexible naming and set of APIs than read and write. And more flexible than a single DoFn, too (which is something I hadn't thought of before - this connector definitely has room for doing some interesting things). On Wed, Dec 7, 2016 at 1:32 AM Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > By the way, just to elaborate a bit why I provided as an IO: > > 1. From an user experience perspective, I think we have to provide > convenient way to write pipeline. Any syntax simplifying this is valuable. > I think it's easier to write: > > pipeline.apply(ExecIO.read().withCommand("foo")) > > than: > > pipeline.apply(Create.of("foo")).apply(ParDo.of(new ExecFn()); > > 2. For me (maybe I'm wrong ;)), an IO is an extension dedicated for > "connector": reading/writing from/to a data source. So, even without the > IO "wrapping" (by wrapping, I mean the Read and Write), I think Exec > extension should be in IO as it's a source/write of data. > > Regards > JB > > On 12/07/2016 08:37 AM, Robert Bradshaw wrote: > > I don't mean to derail the tricky environment questions, but I'm not > > seeing why this is bundled as an IO rather than a plain DoFn (which > > can be applied to a PCollection of one or more commands, yielding > > their outputs). Especially for the case of a Read, which in this case > > is not splittable (initially or dynamically) and always produces a > > single element--feels much more like a Map to me. > > > > On Tue, Dec 6, 2016 at 3:26 PM, Eugene Kirpichov > > <kirpic...@google.com.invalid> wrote: > >> Ben - the issues of "things aren't hung, there is a shell command > running", > >> aren't they general to all DoFn's? i.e. I don't see why the runner would > >> need to know that a shell command is running, but not that, say, a heavy > >> monolithic computation is running. What's the benefit to the runner in > >> knowing that the DoFn contains a shell command? > >> > >> By saying "making sure that all shell commands finish", I suppose you're > >> referring to the possibility of leaks if the user initiates a shell > command > >> and forgets to wait for it? I think that should be solvable again > without > >> Beam intervention, by making a utility class for running shell commands > >> which implements AutoCloseable, and document that you have to use it > that > >> way. > >> > >> Ken - I think the question here is: are we ok with a situation where the > >> runner doesn't check or care whether the shell command can run, and the > >> user accepts this risk and studies what commands will be available on > the > >> worker environment provided by the runner they use in production, before > >> productionizing a pipeline with those commands. > >> > >> Upon some thought I think it's ok. Of course, this carries an obligation > >> for runners to document their worker environment and its changes across > >> versions. Though for many runners such documentation may be trivial: > >> "whatever your YARN cluster has, the runner doesn't change it in any > way" > >> and it may be good enough for users. And for other runners, like > Dataflow, > >> such documentation may also be trivial: "no guarantees whatsoever, only > >> what you stage in --filesToStage is available". > >> > >> I can also see Beam develop to a point where we'd want all runners to be > >> able to run your DoFn in a user-specified Docker container, and manage > >> those intelligently - but I think that's quite a while away and it > doesn't > >> have to block work on a utility for executing shell commands. Though > it'd > >> be nice if the utility was forward-compatible with that future world. > >> > >> On Tue, Dec 6, 2016 at 2:16 AM Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > >> > >>> Hi Eugene, > >>> > >>> thanks for the extended questions. > >>> > >>> I think we have two levels of expectations here: > >>> - end-user responsibility > >>> - worker/runner responsibility > >>> > >>> 1/ From a end-user perspective, the end-user has to know that using a > >>> system command (via ExecIO) and more generally speaking anything which > >>> relay on worker resources (for instance a local filesystem directory > >>> available only on a worker) can fail if the expected resource is not > >>> present on all workers. So, basically, all workers should have the same > >>> topology. It's what I'm assuming for the PR. > >>> For example, I have my Spark cluster, using the same Mesos/Docker > setup, > >>> then the user knows that all nodes in the cluster will have the same > >>> setup and so resources (it could be provided by DevOps for instance). > >>> On the other hand, running on Dataflow is different because I don't > >>> "control" the nodes (bootstrapping or resources), but in that case, the > >>> user knows it (he knows the runner he's using). > >>> > >>> 2/ As you said, we can expect that runner can deal with some > >>> requirements (expressed depending of the pipeline and the runner), and > >>> the runner can know the workers which provide capabilities matching > >>> those requirements. > >>> Then, the end user is not more responsible: the runner will try to > >>> define if the pipeline can be executed, and where a DoFn has to be run > >>> (on which worker). > >>> > >>> For me, it's two different levels where 2 is smarter but 1 can also > make > >>> sense. > >>> > >>> WDYT ? > >>> > >>> Regards > >>> JB > >>> > >>> On 12/05/2016 08:51 PM, Eugene Kirpichov wrote: > >>>> Hi JB, > >>>> > >>>> Thanks for bringing this to the mailing list. I also think that this > is > >>>> useful in general (and that use cases for Beam are more than just > classic > >>>> bigdata), and that there are interesting questions here at different > >>> levels > >>>> about how to do it right. > >>>> > >>>> I suggest to start with the highest-level question [and discuss the > >>>> particular API only after agreeing on this, possibly in a separate > >>> thread]: > >>>> how to deal with the fact that Beam gives no guarantees about the > >>>> environment on workers, e.g. which commands are available, which > shell or > >>>> even OS is being used, etc. Particularly: > >>>> > >>>> - Obviously different runners will have a different environment, e.g. > >>>> Dataflow workers are not going to have Hadoop commands available > because > >>>> they are not running on a Hadoop cluster. So, pipelines and transforms > >>>> developed using this connector will be necessarily non-portable > between > >>>> different runners. Maybe this is ok? But we need to give users a clear > >>>> expectation about this. How do we phrase this expectation and where > do we > >>>> put it in the docs? > >>>> > >>>> - I'm concerned that this puts additional compatibility requirements > on > >>>> runners - it becomes necessary for a runner to document the > environment > >>> of > >>>> its workers (OS, shell, privileges, guaranteed-installed packages, > access > >>>> to other things on the host machine e.g. whether or not the worker > runs > >>> in > >>>> its own container, etc.) and to keep it stable - otherwise transforms > and > >>>> pipelines with this connector will be non-portable between runner > >>> versions > >>>> either. > >>>> > >>>> Another way to deal with this is to give up and say "the environment > on > >>> the > >>>> workers is outside the scope of Beam; consult your runner's > documentation > >>>> or use your best judgment as to what the environment will be, and use > >>> this > >>>> at your own risk". > >>>> > >>>> What do others think? > >>>> > >>>> On Mon, Dec 5, 2016 at 5:09 AM Jean-Baptiste Onofré <j...@nanthrax.net> > >>> wrote: > >>>> > >>>> Hi beamers, > >>>> > >>>> Today, Beam is mainly focused on data processing. > >>>> Since the beginning of the project, we are discussing about extending > >>>> the use cases coverage via DSLs and extensions (like for machine > >>>> learning), or via IO. > >>>> > >>>> Especially for the IO, we can see Beam use for data integration and > data > >>>> ingestion. > >>>> > >>>> In this area, I'm proposing a first IO: ExecIO: > >>>> > >>>> https://issues.apache.org/jira/browse/BEAM-1059 > >>>> https://github.com/apache/incubator-beam/pull/1451 > >>>> > >>>> Actually, this IO is mainly an ExecFn that executes system commands > >>>> (again, keep in mind we are discussing about data > integration/ingestion > >>>> and not data processing). > >>>> > >>>> For convenience, this ExecFn is wrapped in Read and Write (as a > regular > >>> IO). > >>>> > >>>> Clearly, this IO/Fn depends of the worker where it runs. But it's > under > >>>> the user responsibility. > >>>> > >>>> During the review, Eugene and I discussed about: > >>>> - is it an IO or just a fn ? > >>>> - is it OK to have worker specific IO ? > >>>> > >>>> IMHO, an IO makes lot of sense to me and it's very convenient for end > >>>> users. They can do something like: > >>>> > >>>> PCollection<String> output = > >>>> pipeline.apply(ExecIO.read().withCommand("/path/to/myscript.sh")); > >>>> > >>>> The pipeline will execute myscript and the output pipeline will > contain > >>>> command execution std out/err. > >>>> > >>>> On the other hand, they can do: > >>>> > >>>> pcollection.apply(ExecIO.write()); > >>>> > >>>> where PCollection contains the commands to execute. > >>>> > >>>> Generally speaking, end users can call ExecFn wherever they want in > the > >>>> pipeline steps: > >>>> > >>>> PCollection<String> output = pipeline.apply(ParDo.of(new > >>> ExecIO.ExecFn())); > >>>> > >>>> The input collection contains the commands to execute, and the output > >>>> collection contains the commands execution result std out/err. > >>>> > >>>> Generally speaking, I'm preparing several IOs more on the data > >>>> integration/ingestion area than on "pure" classic big data > processing. I > >>>> think it would give a new "dimension" to Beam. > >>>> > >>>> Thoughts ? > >>>> > >>>> Regards > >>>> JB > >>>> -- > >>>> Jean-Baptiste Onofré > >>>> jbono...@apache.org > >>>> http://blog.nanthrax.net > >>>> Talend - http://www.talend.com > >>>> > >>> > >>> -- > >>> Jean-Baptiste Onofré > >>> jbono...@apache.org > >>> http://blog.nanthrax.net > >>> Talend - http://www.talend.com > >>> > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com >